认证组件


只要继承了 APIView 就可以使用认证组件

认证: 每次请求都要带着token(即: 字符串)来进行验证,如果认证成功则返回相应的数据

组件的执行顺序:  认证 -> 权限 -> 频率

1. 认证类的创建

  • 认证类的固定写法一 -> 没有继承 BaseAuthentication 认证类

    • 如果没有继承 BaseAuthentication 认证类,那么就需要在认证类中编写两个方法

      • authenticate -> 编写认证的逻辑代码
      • authenticate_header -> 不知道有啥用,反正就是要写上 rest-framework 的源码规定的,否则就会报错 

    • 注意:

      • authenticate 方法必须有返回值

      • 当验证失败的时候(即: if Fasle 的时候)要主动抛出异常,因为 rest-framework 是通过捕获异常来进行处理的

    • authenticate 方法返回值说明:

      • 返回 两个值 或者一个列表/元祖中包含两个值

        • rest-framework 会将第一个值赋值给 request.user,第二个值会赋值给 request.auth,然后在视图类中就可以通过 request.user/auth 分别获取这两个值

      • 返回 None

        • 跳出当前认证,执行下一个认证

      • 返回 异常

        • 跳出当前认证,抛出异常信息

# rf_auth.py

from rest_framework import exceptions


class 认证类的类名(object):

    def authenticate(self, request):
# 认证的逻辑代码

        if True/False:
            return 返回值一返回值二
elif xxx:
            return None  # 跳出本次认证
        else:
# raise exceptions.AuthenticationFailed("验证失败!")  # 设置报错信息
raise exceptions.AuthenticationFailed({'code': 1000, 'msg': '验证失败'})  # 设置报错信息

    def authenticate_header(self, request):
        pass

  • 认证类的固定写法二 -> 继承 BaseAuthentication 类 -> 常用

    • BaseAuthentication 认证类其实就是一个初始化好的认证类

    • BaseAuthentication 认证类里面也有 authenticate 和 authenticate_header 方法,只不过这两个方法里面没有任何逻辑代码

    • 在编写认证类的时候可以继承 BaseAuthentication 认证类,然后重写 authenticate 方法,而自己编写的认证类就无需写 authenticate_header 方法了,因为 BaseAuthentication 认证类里面有

    • 注意:

      • authenticate 方法必须有返回值

      • 当验证失败的时候(即: if Fasle 的时候)要主动抛出异常,因为 rest-framework 是通过捕获异常来进行处理的

    • authenticate 方法返回值说明:

      • 返回 两个值 或者一个列表/元祖中包含两个值

        • rest-framework 会将第一个值赋值给 request.user,第二个值会赋值给 request.auth,然后在视图类中就可以通过 request.user/auth 分别获取这两个值

      • 返回 None

        • 跳出当前认证,执行下一个认证

      • 返回 异常

        • 跳出当前认证,抛出异常信息

# rf_auth.py

from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions


class 认证类的类名(BaseAuthentication):

    def authenticate(self, request):
# 认证的逻辑代码

        if True/False:
            return 返回值一返回值二
elif xxx:
            return None  # 跳出本次认证
        else:
# raise exceptions.AuthenticationFailed("验证失败!")  # 设置报错信息
raise exceptions.AuthenticationFailed({'code': 1000, 'msg': '验证失败'})  # 设置报错信息

  • 认证类的例子

# rf_auth.py

from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions
from .models import *


class TokenAuth(BaseAuthentication):
    def authenticate(self, request):
        token = request.GET.get("token")  # 从get请求中获取token值进行判断
        token_obj = Token.objects.filter(token=token).first()  # 判断是否有当前token值的记录
        if token_obj:
            return token_obj.user, token_obj.token
        else:
# raise exceptions.AuthenticationFailed("验证失败!")  # 设置报错信息
raise exceptions.AuthenticationFailed({'code': 1000, 'msg': '验证失败'})  # 设置报错信息

3. 局部认证

  • 局部认证作用于当前视图的所有请求

  • 局部认证的配置(即: authentication_classes = [认证类, 认证类])会覆盖掉该视图类在全局认证中的配置

  • authentication_classes = [认证类, 认证类]

# rf_auth.py

from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions
from .models import *


class TokenAuth(BaseAuthentication):
    def authenticate(self, request):
        token = request.GET.get("token")  # 从get请求中获取token值进行判断
        token_obj = Token.objects.filter(token=token).first()  # 判断是否有当前token值的记录
        if token_obj:
            return token_obj.user, token_obj.token
        else:
# raise exceptions.AuthenticationFailed("验证失败!")  # 设置报错信息
raise exceptions.AuthenticationFailed({'code': 1000, 'msg': '验证失败'})  # 设置报错信息

# views.py

import hashlib
import time

from django.shortcuts import render, HttpResponse
from rest_framework.views import APIView
from rest_framework.response import Response
from .models import *
from .serializer import *
from .rf_auth import *

from rest_framework import viewsets


class TestRequestView(APIView):
authentication_classes = [TokenAuth]  # 局部认证,只作用于当前视图

    def get(self, request):
        print(request.user)  # Kevin -> user对象 -> TokenAuth 认证类下 authenticate 方法的返回值的第一个值
        print(request.auth)  # aec82e5c956da440c11651d0e4dfd052 -> token 值 -> TokenAuth 认证类下 authenticate 方法的返回值的第二个值
        return Response('测试 request.user/auth 的值')


class BookViewSet(viewsets.ModelViewSet):
authentication_classes = [TokenAuth]  # 局部认证,只作用于当前视图下的所有请求

    queryset = Book.objects.all()
    serializer_class = BookSerializers


class PublishViewSet(viewsets.ModelViewSet):
authentication_classes = [TokenAuth]  # 局部认证,只作用于当前视图下的所有请求

    queryset = Publish.objects.all()
    serializer_class = PublishSerializers


# 获取以用户名作为盐,以当前时间作为加密内容的字符串token
def get_token_str(username):
    now_time = str(time.time())
    md5 = hashlib.md5(bytes(username, encoding="utf8"))  # 加盐,使用用户名作为盐,保证token的唯一性
    md5.update(bytes(now_time, encoding='utf-8'))
    return md5.hexdigest()


# 登陆,并且返回当前用户的token
class LoginView(APIView):
    def post(self, request):
        username = request.data.get('username')
        password = request.data.get('password')
        user = User.objects.filter(username=username, password=password).first()
        ret = {
            'status': 0,
            'msg': None
        }
        if user:
            token_str = get_token_str(user.username)  # 获取Token值
# 每次登陆成功后都要修改当前用户的token值
            Token.objects.update_or_create(user=user, defaults={'token': token_str})  # 如果有 user=user 这条数据,那么就修改 token 值,否则就进行添加
            ret['status'] = 1
            ret['token'] = token_str
        else:
            ret['msg'] = '登陆失败'

        return Response(ret)

# 接口说明: 登陆,获取token值

# 接口: http://127.0.0.1:8000/login/

# 请求类型: POST

# 发送数据的编码格式: application/json -> 即: json 格式

# 所要发送的数据: {"username":"Kevin","password":"123"}

# 结果:

{
    "status": 1,
    "msg": null,
    "token": "4a70be5742f536bd78dad811c5044827"
}

# --------------------------------------------------------------

# 接口说明: 查看所有书籍(带token值)

# 接口: http://127.0.0.1:8000/book/?token=4a70be5742f536bd78dad811c5044827

# 请求类型: GET

# 结果:

[
    {
        "id": 2,
        "title": "三体(第二部)",
        "price": 100,
        "pub_date": "2012-12-12",
        "publish": 1,
        "authors": [
            1,
            2
        ]
    },

……
]

# --------------------------------------------------------------

# 接口说明: 查看所有书籍(不带token值)

# 接口: http://127.0.0.1:8000/book/

# 请求类型: GET

# 结果:

{
    "detail": "验证失败!"
}

3. 局部请求认证

  • 局部请求认证作用于当前视图的某几个请求

  • 通过重写 rest-framework 源码中 get_authenticators 方法来实现局部请求认证

  • get_authenticators 返回值格式: [认证类(), 认证类(), 认证类(), ……]

# rf_auth.py

from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions
from .models import *


class TokenAuth(BaseAuthentication):
    def authenticate(self, request):
        token = request.GET.get("token")  # 从get请求中获取token值进行判断
        token_obj = Token.objects.filter(token=token).first()  # 判断是否有当前token值的记录
        if token_obj:
            return token_obj.user, token_obj.token
        else:
# raise exceptions.AuthenticationFailed("验证失败!")  # 设置报错信息
raise exceptions.AuthenticationFailed({'code': 1000, 'msg': '验证失败'})  # 设置报错信息

# views.py

import hashlib
import time

from django.shortcuts import render, HttpResponse
from rest_framework.views import APIView
from rest_framework.response import Response
from .models import *
from .serializer import *
from .rf_auth import *

from rest_framework import viewsets

from rest_framework.authentication import SessionAuthentication
from rest_framework.authentication import BasicAuthentication


class BookView(APIView):

局部请求认证,只作用于当前视图的某几个请求
    def get_authenticators(self):  # 通过重写 rest-framework 的 get_authenticators 方法从而实现局部请求认证
        if self.request.method == 'POST':  # 当请求是 POST 请求的时候,返回自定义的认证类,否则返回 rest-framework 默认的两个认证类
            return [TokenAuth(), ]  # 自定义的认证类
        else:
            return [SessionAuthentication(), BasicAuthentication()# rest-framework 默认返回的两个认证类

    def get(self, request, *args, **kwargs):
        book_queryset = Book.objects.all()
        ser = BookSerializers(instance=book_queryset, many=True)
        return Response(ser.data)

    def post(self, request):
        ser = BookSerializers(data=request.data)
        if ser.is_valid():
            ser.save()
            return Response(ser.data)
        else:
            return Response(ser.errors)


# 获取以用户名作为盐,以当前时间作为加密内容的字符串token
def get_token_str(username):
    now_time = str(time.time())
    md5 = hashlib.md5(bytes(username, encoding="utf8"))  # 加盐,使用用户名作为盐,保证token的唯一性
    md5.update(bytes(now_time, encoding='utf-8'))
    return md5.hexdigest()


# 登陆,并且返回当前用户的token
class LoginView(APIView):
    def post(self, request):
        username = request.data.get('username')
        password = request.data.get('password')
        user = User.objects.filter(username=username, password=password).first()
        ret = {
            'status': 0,
            'msg': None
        }
        if user:
            token_str = get_token_str(user.username)  # 获取Token值
# 每次登陆成功后都要修改当前用户的token值
            Token.objects.update_or_create(user=user, defaults={'token': token_str})  # 如果有 user=user 这条数据,那么就修改 token 值,否则就进行添加
            ret['status'] = 1
            ret['token'] = token_str
        else:
            ret['msg'] = '登陆失败'

        return Response(ret)

# 接口说明: 登陆,获取token值

# 接口: http://127.0.0.1:8000/login/

# 请求类型: POST

# 发送数据的编码格式: application/json -> 即: json 格式

# 所要发送的数据: {"username":"Kevin","password":"123"}

# 结果:

{
    "status": 1,
    "msg": null,
    "token": "4a70be5742f536bd78dad811c5044827"
}

# --------------------------------------------------------------

# 接口说明: 查看所有书籍

# 接口: http://127.0.0.1:8000/book/

# 请求类型: GET

# 结果:

[
    {
        "id": 2,
        "title": "三体(第二部)",
        "price": 100,
        "pub_date": "2012-12-12",
        "publish": 1,
        "authors": [
            1,
            2
        ]
    },

……
]

# --------------------------------------------------------------

# 接口说明: 添加书籍(带token值)

# 接口: http://127.0.0.1:8000/book/?token=4a70be5742f536bd78dad811c5044827

# 请求类型: POST

# 结果:

{
    "id": 3,
    "title": "七大罪",
    "price": 50,
    "pub_date": "2020-04-08",
    "production_date": "2020-04-08T07:06:16Z",
    "publish": 1,
    "authors": [
        1,
        2
    ]
}

# --------------------------------------------------------------

# 接口说明: 添加书籍(不带token值)

# 接口: http://127.0.0.1:8000/book/

# 请求类型: POST

# 结果:

{
    "msg": "验证失败",
    "code": "1000"
}

4. 全局认证

  • 全局认证作用于所有视图类

# rf_auth.py

from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions
from .models import *


class TokenAuth(BaseAuthentication):
    def authenticate(self, request):
        token = request.GET.get("token")  # 从get请求中获取token值进行判断
        token_obj = Token.objects.filter(token=token).first()  # 判断是否有当前token值的记录
        if token_obj:
            return token_obj.user, token_obj.token
        else:
# raise exceptions.AuthenticationFailed("验证失败!")  # 设置报错信息
raise exceptions.AuthenticationFailed({'code': 1000, 'msg': '验证失败'})  # 设置报错信息

# settings.py

# 配置全局认证所需要的认证类的路径
REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES": ["app01.rf_auth.TokenAuth"]
}

  • 不进行认证的视图类设置

    • 如果设置了全局认证,但是又想某些视图不进行认证,那么可以在不想进行认证的视图类中设置 authentication_classes = [],因为在 rest-framework 源码中 局部认证的配置 会覆盖掉 全局认证的配置

# views.py

import hashlib
import time

from django.shortcuts import render, HttpResponse
from rest_framework.views import APIView
from rest_framework.response import Response
from .models import *
from .serializer import *
from .rf_auth import *

from rest_framework import viewsets


class BookViewSet(viewsets.ModelViewSet):
    queryset = Book.objects.all()
    serializer_class = BookSerializers


class PublishViewSet(viewsets.ModelViewSet):
    queryset = Publish.objects.all()
    serializer_class = PublishSerializers


# 获取以用户名作为盐,以当前时间作为加密内容的字符串token
def get_token_str(username):
    now_time = str(time.time())
    md5 = hashlib.md5(bytes(username, encoding="utf8"))  # 加盐,使用用户名作为盐,保证token的唯一性
    md5.update(bytes(now_time, encoding='utf-8'))
    return md5.hexdigest()


# 登陆,并且返回当前用户的token
class LoginView(APIView):
authentication_classes = [] # 在全局认证下,当前视图不进行认证

    def post(self, request):
        username = request.data.get('username')
        password = request.data.get('password')
        user = User.objects.filter(username=username, password=password).first()
        ret = {
            'status': 0,
            'msg': None
        }
        if user:
            token_str = get_token_str(user.username)  # 获取Token值
# 每次登陆成功后都要修改当前用户的token值
            Token.objects.update_or_create(user=user, defaults={'token': token_str})  # 如果有 user=user 这条数据,那么就修改 token 值,否则就进行添加
            ret['status'] = 1
            ret['token'] = token_str
        else:
            ret['msg'] = '登陆失败'

        return Response(ret)